package me.seu.demo.transaction;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * RocketMQ本地事务监听器
 *
 * @author liangfeihu
 * @since 2020/4/17 11:03
 */
@Slf4j
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<String, Integer>();

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 获取消息体字符串
        String msgBody = getMessageBody(msg);
        String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(transId, status);
        log.info("#### executeLocalTransaction is executed, msgTransactionId={} status={}", transId, status);
        if (status == 0) {
            // Return local transaction with success(commit), in this case,
            // this message will not be checked in checkLocalTransaction()

            log.info("【COMMIT】Simulating msg=[{}] related local transaction exec succeeded! ", msgBody);
            return RocketMQLocalTransactionState.COMMIT;
        } else if (status == 1) {
            // Return local transaction with failure(rollback) , in this case,
            // this message will not be checked in checkLocalTransaction()

            log.info("【ROLLBACK】 Simulating msg=[{}] related local transaction exec failed! ", msgBody);
            return RocketMQLocalTransactionState.ROLLBACK;
        }

        log.info("【UNKNOWN】 Simulating msg=[{}] related local transaction exec UNKNOWN!", msgBody);
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
        Integer status = localTrans.get(transId);
        if (null != status) {
            switch (status) {
                case 0:
                    retState = RocketMQLocalTransactionState.UNKNOWN;
                    break;
                case 1:
                    retState = RocketMQLocalTransactionState.COMMIT;
                    break;
                case 2:
                    retState = RocketMQLocalTransactionState.ROLLBACK;
                    break;
            }
        }
        log.info("### !!! checkLocalTransaction is executed once, msgTransactionId={}, TransactionState={} status={} ", transId, retState, status);
        return retState;
    }

    /**
     * 获取消息体字符串
     *
     * @param msg
     * @return
     */
    private String getMessageBody(Message msg) {
        Object payloadObj = msg.getPayload();
        String msgBody = null;
        try {
            byte[] payloads;
            if (null == payloadObj) {
                throw new RuntimeException("the message cannot be empty");
            }
            if (payloadObj instanceof String) {
                payloads = ((String) payloadObj).getBytes("UTF-8");
            } else if (payloadObj instanceof byte[]) {
                payloads = (byte[]) msg.getPayload();
            } else {
                throw new RuntimeException("the message format not supported");
            }
            msgBody = new String(payloads, "UTF-8");
        } catch (Exception e) {
            throw new RuntimeException("convert to RocketMQ message failed.", e);
        }
        return msgBody;
    }
}
